package com.zhang.hadoop.flink.base;

import com.zhang.hadoop.flink.base.Event;
import org.apache.flink.streaming.api.functions.source.SourceFunction;

import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.util.Random;

/**
 * @author: zhang yufei
 * @createTime:2022/5/14 16:35
 * @description:
 */
public class ClickSource implements SourceFunction<Event> {

    private Boolean running = true;

    private Random random = new Random();

    @Override
    public void run(SourceContext<Event> sourceContext) throws Exception {
        //随机生成数据
        //String[] users = {"yanghui", "yuping", "yangdan", "jingru"};
        //String[] urls = {"/yindao", "/gangmen", "/koujiao", "/siwazujiao"};
        String[] users = {"欧阳慧", "陈玉萍", "杨丹", "王静茹"};
        String[] urls = {"/阴道", "/肛门", "/尿道", "/丝袜足交"};

        //循环生成数据
        while (running) {
            String user = users[random.nextInt(users.length)];
            String url = urls[random.nextInt(urls.length)];
            Long timestamp = LocalDateTime.now().toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
            sourceContext.collect(new Event(user,url,timestamp));
            Thread.sleep(1000L);
        }
    }

    @Override
    public void cancel() {
        running = false;
    }
}
